package com.leon.hom.mqtt;

import com.leon.hom.core.config.BrokerConfig;
import com.leon.hom.core.log.Loggers;
import com.leon.hom.mqtt.callback.ReceiveMqttCallback;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import java.util.UUID;

public class ReceiveMqttClient {

    /**
     * 设置超时时间
     */
    Integer connectionTimeout = 10;
    /**
     * 会话心跳时间
     */
    Integer keepAliveInterval = 30;

    /**
     * 客户端数组
     */
    MqttClient mqttClient;

    public ReceiveMqttClient(BrokerConfig brokerConfig, String topic, ReceiveMqttCallback callback) throws Exception {

        MqttClient client = new MqttClient(brokerConfig.getUrl(), UUID.randomUUID().toString(), new MemoryPersistence());
        MqttConnectOptions options = new MqttConnectOptions();
        // 如果想要断线这段时间的数据，要设置成false，并且重连后不用再次订阅，否则不会得到断线时间的数据
        options.setCleanSession(true);
        // 设置连接的用户名
        options.setUserName(brokerConfig.getUsername());
        // 设置连接的密码
        options.setPassword(brokerConfig.getPassword().toCharArray());
        // 设置超时时间 单位为秒
        options.setConnectionTimeout(connectionTimeout);
        // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线，但这个方法并没有重连的机制
        options.setKeepAliveInterval(keepAliveInterval);
        // 连接服务器
        client.connect(options);
        // 订阅
        client.subscribe(topic, brokerConfig.getQos());
        // 回调
        client.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable cause) {
                try {
                    mqttClient.reconnect();
                } catch (MqttException e) {
                    Loggers.MQTT.error(e.getMessage());
                }
            }

            @Override
            public void messageArrived(String topic, MqttMessage message) {
                callback.handle(topic, message);
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {

            }
        });
        this.mqttClient = client;
        Loggers.MQTT.info("Connect MQTT broker: {} , Qos: {} , subscribe topic: {}", brokerConfig.getUrl(), brokerConfig.getQos(), topic);
    }

    public void close() throws MqttException {
        this.mqttClient.close();
    }

}
